/*
 * Copyright (c) 2022. China Mobile (SuZhou) Software Technology Co.,Ltd. All rights reserved.
 * Lakehouse is licensed under Mulan PSL v2.
 * You can use this software according to the terms and conditions of the Mulan PSL v2.
 * You may obtain a copy of Mulan PSL v2 at:
 *          http://license.coscl.org.cn/MulanPSL2
 * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
 * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
 * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
 * See the Mulan PSL v2 for more details.
 */

package com.chinamobile.cmss.lakehouse.core.client.impl;

import com.chinamobile.cmss.lakehouse.common.kubernetes.K8sModelConstant;
import com.chinamobile.cmss.lakehouse.common.utils.Actions;
import com.chinamobile.cmss.lakehouse.core.client.IKubernetesClient;
import com.chinamobile.cmss.lakehouse.core.client.InformerCreateEvent;
import com.chinamobile.cmss.lakehouse.core.client.InformerDeleteEvent;
import com.chinamobile.cmss.lakehouse.core.client.InformerEventType;
import com.chinamobile.cmss.lakehouse.core.client.InformerException;
import com.chinamobile.cmss.lakehouse.core.client.InformerFactory;
import com.chinamobile.cmss.lakehouse.core.client.InformerUtils;
import com.chinamobile.cmss.lakehouse.core.config.KubernetesConfiguration;

import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;

import com.google.common.annotations.VisibleForTesting;
import io.kubernetes.client.common.KubernetesObject;
import io.kubernetes.client.informer.ResourceEventHandler;
import io.kubernetes.client.informer.SharedIndexInformer;
import io.kubernetes.client.informer.SharedInformerFactory;
import io.kubernetes.client.informer.cache.Lister;
import io.kubernetes.client.openapi.ApiClient;
import io.kubernetes.client.openapi.apis.AppsV1Api;
import io.kubernetes.client.openapi.apis.BatchV1Api;
import io.kubernetes.client.openapi.apis.CoreV1Api;
import io.kubernetes.client.openapi.apis.ExtensionsV1beta1Api;
import io.kubernetes.client.openapi.apis.RbacAuthorizationV1Api;
import io.kubernetes.client.openapi.models.V1ClusterRoleBinding;
import io.kubernetes.client.openapi.models.V1ClusterRoleBindingList;
import io.kubernetes.client.openapi.models.V1ConfigMap;
import io.kubernetes.client.openapi.models.V1ConfigMapList;
import io.kubernetes.client.openapi.models.V1Deployment;
import io.kubernetes.client.openapi.models.V1DeploymentList;
import io.kubernetes.client.openapi.models.V1Namespace;
import io.kubernetes.client.openapi.models.V1NamespaceList;
import io.kubernetes.client.openapi.models.V1Service;
import io.kubernetes.client.openapi.models.V1ServiceList;
import io.kubernetes.client.openapi.models.V1StatefulSet;
import io.kubernetes.client.openapi.models.V1StatefulSetList;
import io.kubernetes.client.util.CallGeneratorParams;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Slf4j
@VisibleForTesting
@Component
public class InformerFactoryImpl implements InformerFactory {

    protected final ConcurrentHashMap<String, CompletableFuture<Void>> informerContainers =
        new ConcurrentHashMap<>();
    protected final ConcurrentHashMap<String, InformerCreateEvent> createEventContainers =
        new ConcurrentHashMap<>();
    protected final ConcurrentHashMap<String, InformerDeleteEvent> deleteEventContainers =
        new ConcurrentHashMap<>();
    private final String labelSelector = K8sModelConstant.LABEL_SELECTOR;
    private SharedInformerFactory sharedInformerFactory;
    private AppsV1Api appsV1Api;
    private CoreV1Api coreV1Api;
    private BatchV1Api batchV1Api;
    private ExtensionsV1beta1Api extensionsV1beta1Api;
    private RbacAuthorizationV1Api rbacAuthorizationV1Api;

    @Autowired
    public InformerFactoryImpl(IKubernetesClient kubernetesClient) {
        ApiClient apiClient = kubernetesClient.getApiClient();
        // fix memory leak reported in https://github.com/kubernetes-client/java/issues/912
        apiClient.setReadTimeout(0);
        this.sharedInformerFactory = new SharedInformerFactory(apiClient);
        this.appsV1Api = new AppsV1Api(apiClient);
        this.coreV1Api = new CoreV1Api(apiClient);
        this.batchV1Api = new BatchV1Api(apiClient);
        this.extensionsV1beta1Api = new ExtensionsV1beta1Api(apiClient);
        this.rbacAuthorizationV1Api = new RbacAuthorizationV1Api();
    }

    @Override
    public CompletableFuture<Void> getOrAddNamespaceInformer() {
        // Ensure only one namespace informer is created and initialized
        return informerContainers.computeIfAbsent(InformerEventType.NAMESPACE.name(), (nsName) -> {
            // Create the namespace informer
            final SharedIndexInformer<V1Namespace> namespaceInformer =
                sharedInformerFactory.sharedIndexInformerFor((CallGeneratorParams params) -> {
                    return coreV1Api.listNamespaceCall(null, null, null, null, labelSelector, null,
                        params.resourceVersion, params.timeoutSeconds, params.watch, null);
                }, V1Namespace.class, V1NamespaceList.class);

            namespaceInformer.addEventHandler(createNamespaceHandler());

            // Start the namespace informer
            sharedInformerFactory.startAllRegisteredInformers();

            CompletableFuture<Void> future = new CompletableFuture<>();

            // Ensure the namespace informer is synced
            syncInformer(namespaceInformer, V1Namespace.class, new InformerInitializeCallback() {
                @Override
                public void initializeComplete() {
                    future.complete(null);
                }

                @Override
                public void initializeFailed(InformerException e) {
                    future.completeExceptionally(e);
                }
            });
            return future;
        });
    }

    @SuppressWarnings("unchecked")
    private ResourceEventHandler<V1Namespace> createNamespaceHandler() {
        return new ResourceEventHandler<V1Namespace>() {
            @Override
            public void onAdd(V1Namespace v1namespace) {
                String addedNamespace = v1namespace.getMetadata().getName();
                List<InformerCreateEvent> createEvents = createEventContainers.entrySet().stream()
                    .filter(entry -> InformerEventType.NAMESPACE.equals(entry.getValue().getType()))
                    .filter(k -> k.getKey().equals(addedNamespace)).map(Map.Entry::getValue)
                    .collect(Collectors.toList());
                createEvents.forEach(ice -> {
                    log.info("{} namespace added!", addedNamespace);
                    ice.setStatus(InformerCreateEvent.CreateStatus.CREATE_SUCCESS);
                    ice.setResult(v1namespace);
                    ice.release();
                });
            }

            @Override
            public void onUpdate(V1Namespace oldNamespace, V1Namespace newNamespace) {
                log.debug("{} => {} namespace updated!", oldNamespace.getMetadata().getName(),
                    newNamespace.getMetadata().getName());
            }

            @Override
            public void onDelete(V1Namespace v1namespace, boolean deletedFinalStateUnknown) {
                String deletedNamespace = v1namespace.getMetadata().getName();
                List<InformerDeleteEvent> deleteEvents = deleteEventContainers.entrySet().stream()
                    .filter(entry -> InformerEventType.NAMESPACE.equals(entry.getValue().getType()))
                    .filter(k -> k.getKey().equals(deletedNamespace)).map(Map.Entry::getValue)
                    .collect(Collectors.toList());
                deleteEvents.forEach(ide -> {
                    log.info("{} namespace deleted!", deletedNamespace);
                    ide.setStatus(InformerDeleteEvent.DeleteStatus.DELETE_SUCCESS);
                    ide.setResult(v1namespace);
                    ide.release();
                });
            }

        };
    }

    @Override
    public CompletableFuture<Void> getOrAddClusterRoleBindingInformer() {
        // Ensure only one clusterRoleBinding informer is created and initialized
        return informerContainers.computeIfAbsent(InformerEventType.CLUSTER_ROLE_BINDING.name(),
            (crbName) -> {
                // Create the clusterRoleBinding informer
                final SharedIndexInformer<V1ClusterRoleBinding> crbInformer =
                    sharedInformerFactory.sharedIndexInformerFor((CallGeneratorParams params) -> {
                        return rbacAuthorizationV1Api.listClusterRoleBindingCall(null, null, null, null,
                            labelSelector, null, params.resourceVersion, params.timeoutSeconds, params.watch,
                            null);
                    }, V1ClusterRoleBinding.class, V1ClusterRoleBindingList.class);

                crbInformer.addEventHandler(createClusterRoleBindingHandler());

                // Start the clusterRoleBinding informer
                sharedInformerFactory.startAllRegisteredInformers();

                CompletableFuture<Void> future = new CompletableFuture<>();

                // Ensure the namespace informer is synced
                syncInformer(crbInformer, V1ClusterRoleBinding.class, new InformerInitializeCallback() {
                    @Override
                    public void initializeComplete() {
                        future.complete(null);
                    }

                    @Override
                    public void initializeFailed(InformerException e) {
                        future.completeExceptionally(e);
                    }
                });
                return future;
            });
    }

    @SuppressWarnings("unchecked")
    private ResourceEventHandler<V1ClusterRoleBinding> createClusterRoleBindingHandler() {
        return new ResourceEventHandler<V1ClusterRoleBinding>() {
            @Override
            public void onAdd(V1ClusterRoleBinding v1ClusterRoleBinding) {
                String addedClusterRoleBinding = v1ClusterRoleBinding.getMetadata().getName();
                List<InformerCreateEvent> createEvents = createEventContainers.entrySet().stream()
                    .filter(
                        entry -> InformerEventType.CLUSTER_ROLE_BINDING.equals(entry.getValue().getType()))
                    .filter(k -> k.getKey().equals(addedClusterRoleBinding)).map(Map.Entry::getValue)
                    .collect(Collectors.toList());
                createEvents.forEach(ice -> {
                    log.info("{} clusterRoleBinding added!", addedClusterRoleBinding);
                    ice.setStatus(InformerCreateEvent.CreateStatus.CREATE_SUCCESS);
                    ice.setResult(v1ClusterRoleBinding);
                    ice.release();
                });
            }

            @Override
            public void onUpdate(V1ClusterRoleBinding oldClusterRoleBinding,
                                 V1ClusterRoleBinding newClusterRoleBinding) {
                log.debug("{} => {} clusterRoleBinding updated!",
                    oldClusterRoleBinding.getMetadata().getName(),
                    newClusterRoleBinding.getMetadata().getName());
            }

            @Override
            public void onDelete(V1ClusterRoleBinding v1ClusterRoleBinding,
                                 boolean deletedFinalStateUnknown) {
                String deletedClusterRoleBinding = v1ClusterRoleBinding.getMetadata().getName();
                List<InformerDeleteEvent> deleteEvents = deleteEventContainers.entrySet().stream()
                    .filter(
                        entry -> InformerEventType.CLUSTER_ROLE_BINDING.equals(entry.getValue().getType()))
                    .filter(k -> k.getKey().equals(deletedClusterRoleBinding)).map(Map.Entry::getValue)
                    .collect(Collectors.toList());
                deleteEvents.forEach(ide -> {
                    log.info("{} clusterRoleBinding deleted!", deletedClusterRoleBinding);
                    ide.setStatus(InformerDeleteEvent.DeleteStatus.DELETE_SUCCESS);
                    ide.setResult(v1ClusterRoleBinding);
                    ide.release();
                });
            }

        };
    }

    @Override
    public CompletableFuture<Void> getOrAddServiceInformer() {
        // Ensure only one service informer is created and initialized
        return informerContainers.computeIfAbsent(InformerEventType.SERVICE.name(), (svcName) -> {
            // Create the service informer
            final SharedIndexInformer<V1Service> svcInformer =
                sharedInformerFactory.sharedIndexInformerFor((CallGeneratorParams params) -> {
                    return coreV1Api.listServiceForAllNamespacesCall(null, null, null, labelSelector, null,
                        null, params.resourceVersion, params.timeoutSeconds, params.watch, null);
                }, V1Service.class, V1ServiceList.class);

            svcInformer.addEventHandler(createServiceHandler());

            // Start the service informer
            sharedInformerFactory.startAllRegisteredInformers();

            CompletableFuture<Void> future = new CompletableFuture<>();

            // Ensure the service informer is synced
            syncInformer(svcInformer, V1Service.class, new InformerInitializeCallback() {
                @Override
                public void initializeComplete() {
                    future.complete(null);
                }

                @Override
                public void initializeFailed(InformerException e) {
                    future.completeExceptionally(e);
                }
            });
            return future;
        });
    }

    @SuppressWarnings("unchecked")
    private ResourceEventHandler<V1Service> createServiceHandler() {
        return new ResourceEventHandler<V1Service>() {
            @Override
            public void onAdd(V1Service v1Service) {
                String serviceName = v1Service.getMetadata().getName();
                String addedWrapService =
                    InformerUtils.wrap(v1Service.getMetadata().getNamespace(), serviceName);
                List<InformerCreateEvent> createEvents = createEventContainers.entrySet().stream()
                    .filter(entry -> InformerEventType.SERVICE.equals(entry.getValue().getType()))
                    .filter(k -> k.getKey().equals(addedWrapService)).map(Map.Entry::getValue)
                    .collect(Collectors.toList());
                createEvents.forEach(ice -> {
                    log.info("{} service added in namespace {}!", serviceName,
                        v1Service.getMetadata().getNamespace());
                    ice.setStatus(InformerCreateEvent.CreateStatus.CREATE_SUCCESS);
                    ice.setResult(v1Service);
                    ice.release();
                });
            }

            @Override
            public void onUpdate(V1Service oldService, V1Service newService) {
                log.debug("{} => {} service updated in namespace {}!", oldService.getMetadata().getName(),
                    newService.getMetadata().getName(), newService.getMetadata().getNamespace());
            }

            @Override
            public void onDelete(V1Service v1Service, boolean deletedFinalStateUnknown) {
                String serviceName = v1Service.getMetadata().getName();
                String deletedWrapService =
                    InformerUtils.wrap(v1Service.getMetadata().getNamespace(), serviceName);
                List<InformerDeleteEvent> deleteEvents = deleteEventContainers.entrySet().stream()
                    .filter(entry -> InformerEventType.SERVICE.equals(entry.getValue().getType()))
                    .filter(k -> k.getKey().equals(deletedWrapService)).map(Map.Entry::getValue)
                    .collect(Collectors.toList());
                deleteEvents.forEach(ide -> {
                    log.info("{} service deleted in namespace {}!", serviceName,
                        v1Service.getMetadata().getNamespace());
                    ide.setStatus(InformerDeleteEvent.DeleteStatus.DELETE_SUCCESS);
                    ide.setResult(v1Service);
                    ide.release();
                });
            }

        };
    }

    @Override
    public CompletableFuture<Void> getOrAddStatefulSetInformer() {
        // Ensure only one statefulset informer is created and initialized
        return informerContainers.computeIfAbsent(InformerEventType.STATEFUL_SET.name(), (ssName) -> {
            // Create the statefulset informer
            final SharedIndexInformer<V1StatefulSet> ssInformer =
                sharedInformerFactory.sharedIndexInformerFor((CallGeneratorParams params) -> {
                    return appsV1Api.listStatefulSetForAllNamespacesCall(null, null, null, labelSelector,
                        null, null, params.resourceVersion, params.timeoutSeconds, params.watch, null);
                }, V1StatefulSet.class, V1StatefulSetList.class);

            ssInformer.addEventHandler(createStatefulSetHandler());

            // Start the statefulset informer
            sharedInformerFactory.startAllRegisteredInformers();

            CompletableFuture<Void> future = new CompletableFuture<>();

            // Ensure the statefulset informer is synced
            syncInformer(ssInformer, V1StatefulSet.class, new InformerInitializeCallback() {
                @Override
                public void initializeComplete() {
                    future.complete(null);
                }

                @Override
                public void initializeFailed(InformerException e) {
                    future.completeExceptionally(e);
                }
            });
            return future;
        });
    }

    @Override
    public CompletableFuture<Void> getOrAddDeploymentInformer() {
        // Ensure only one deployment informer is created and initialized
        return informerContainers.computeIfAbsent(InformerEventType.DEPLOYMENT.name(), (dpName) -> {
            // Create the deployment informer
            final SharedIndexInformer<V1Deployment> dmInformer =
                sharedInformerFactory.sharedIndexInformerFor((CallGeneratorParams params) -> {
                    return appsV1Api.listDeploymentForAllNamespacesCall(null, null, null, labelSelector,
                        null, null, params.resourceVersion, params.timeoutSeconds, params.watch, null);
                }, V1Deployment.class, V1DeploymentList.class);

            dmInformer.addEventHandler(createDeploymentHandler());

            // Start the deployment informer
            sharedInformerFactory.startAllRegisteredInformers();

            CompletableFuture<Void> future = new CompletableFuture<>();

            // Ensure the deployment informer is synced
            syncInformer(dmInformer, V1Deployment.class, new InformerInitializeCallback() {
                @Override
                public void initializeComplete() {
                    future.complete(null);
                }

                @Override
                public void initializeFailed(InformerException e) {
                    future.completeExceptionally(e);
                }
            });
            return future;
        });
    }

    @SuppressWarnings("unchecked")
    private ResourceEventHandler<V1StatefulSet> createStatefulSetHandler() {
        return new ResourceEventHandler<V1StatefulSet>() {
            @Override
            public void onAdd(V1StatefulSet v1StatefulSet) {
                String statefulSetName = v1StatefulSet.getMetadata().getName();
                String namespaceName = v1StatefulSet.getMetadata().getNamespace();
                String addedWrapStatefulSet = InformerUtils.wrap(namespaceName, statefulSetName);
                List<InformerCreateEvent> createEvents = createEventContainers.entrySet().stream()
                    .filter(entry -> InformerEventType.STATEFUL_SET.equals(entry.getValue().getType()))
                    .filter(k -> k.getKey().equals(addedWrapStatefulSet)).map(Map.Entry::getValue)
                    .collect(Collectors.toList());
                createEvents.forEach(ice -> {
                    log.info("{} statefulset added in namespace {}!", statefulSetName, namespaceName);
                    // Ensure the statefulset status is ready
                    checkStatefulSetStatus(namespaceName, statefulSetName).thenAccept(ss -> {
                        ice.setStatus(InformerCreateEvent.CreateStatus.CREATE_SUCCESS);
                        ice.setResult(v1StatefulSet);
                        ice.release();
                    }).exceptionally(exception -> {
                        String errMsg =
                            String.format("Check status for statefulset [%s] in namespace [%s] error",
                                statefulSetName, namespaceName);
                        ice.setStatus(InformerCreateEvent.CreateStatus.CREATE_ERROR);
                        ice.setException(new InformerException(errMsg));
                        ice.release();
                        return null;
                    });

                });
            }

            @Override
            public void onUpdate(V1StatefulSet oldStatefulSet, V1StatefulSet newStatefulSet) {
                log.debug("{} => {} statefulset updated in namespace {}!",
                    oldStatefulSet.getMetadata().getName(), newStatefulSet.getMetadata().getName(),
                    newStatefulSet.getMetadata().getNamespace());
            }

            @Override
            public void onDelete(V1StatefulSet v1StatefulSet, boolean deletedFinalStateUnknown) {
                String statefulSetName = v1StatefulSet.getMetadata().getName();
                String deletedWrapStatefulSet =
                    InformerUtils.wrap(v1StatefulSet.getMetadata().getNamespace(), statefulSetName);
                List<InformerDeleteEvent> deleteEvents = deleteEventContainers.entrySet().stream()
                    .filter(entry -> InformerEventType.STATEFUL_SET.equals(entry.getValue().getType()))
                    .filter(k -> k.getKey().equals(deletedWrapStatefulSet)).map(Map.Entry::getValue)
                    .collect(Collectors.toList());
                deleteEvents.forEach(ide -> {
                    log.info("{} statefulset deleted in namespace {}!", statefulSetName,
                        v1StatefulSet.getMetadata().getNamespace());
                    ide.setStatus(InformerDeleteEvent.DeleteStatus.DELETE_SUCCESS);
                    ide.setResult(v1StatefulSet);
                    ide.release();
                });
            }

        };
    }

    private ResourceEventHandler<V1Deployment> createDeploymentHandler() {
        return new ResourceEventHandler<V1Deployment>() {
            @Override
            public void onAdd(V1Deployment v1Deployment) {
                String deploymentName = v1Deployment.getMetadata().getName();
                String namespaceName = v1Deployment.getMetadata().getNamespace();
                String addedWrapDeployment = InformerUtils.wrap(namespaceName, deploymentName);
                List<InformerCreateEvent> createEvents = createEventContainers.entrySet().stream()
                    .filter(entry -> InformerEventType.DEPLOYMENT.equals(entry.getValue().getType()))
                    .filter(k -> k.getKey().equals(addedWrapDeployment)).map(Map.Entry::getValue)
                    .collect(Collectors.toList());
                createEvents.forEach(ice -> {
                    log.info("{} deployment added in namespace {}!", deploymentName, namespaceName);
                    // Ensure the deployment status is ready
                    checkDeploymentStatus(namespaceName, deploymentName).thenAccept(ss -> {
                        ice.setStatus(InformerCreateEvent.CreateStatus.CREATE_SUCCESS);
                        ice.setResult(v1Deployment);
                        ice.release();
                    }).exceptionally(exception -> {
                        String errMsg =
                            String.format("Check status for deployment [%s] in namespace [%s] error",
                                deploymentName, namespaceName);
                        ice.setStatus(InformerCreateEvent.CreateStatus.CREATE_ERROR);
                        ice.setException(new InformerException(errMsg));
                        ice.release();
                        return null;
                    });

                });
            }

            @Override
            public void onUpdate(V1Deployment oldDeployment, V1Deployment newDeployment) {
                String deploymentName = newDeployment.getMetadata().getName();
                String namespaceName = newDeployment.getMetadata().getNamespace();
                String addedWrapDeployment = InformerUtils.wrap(namespaceName, deploymentName);
                List<InformerCreateEvent> createEvents = createEventContainers.entrySet().stream()
                    .filter(entry -> InformerEventType.DEPLOYMENT.equals(entry.getValue().getType()))
                    .filter(k -> k.getKey().equals(addedWrapDeployment)).map(Map.Entry::getValue)
                    .collect(Collectors.toList());
                createEvents.forEach(ice -> {
                    log.info("{} deployment updated in namespace {}!", deploymentName, namespaceName);
                    // Ensure the deployment status is ready
                    checkDeploymentStatus(namespaceName, deploymentName).thenAccept(ss -> {
                        ice.setStatus(InformerCreateEvent.CreateStatus.CREATE_SUCCESS);
                        ice.setResult(newDeployment);
                        ice.release();
                    }).exceptionally(exception -> {
                        String errMsg =
                            String.format("Check status for deployment [%s] in namespace [%s] error",
                                deploymentName, namespaceName);
                        ice.setStatus(InformerCreateEvent.CreateStatus.CREATE_ERROR);
                        ice.setException(new InformerException(errMsg));
                        ice.release();
                        return null;
                    });

                });
            }

            @Override
            public void onDelete(V1Deployment v1Deployment, boolean deletedFinalStateUnknown) {
                String deploymentName = v1Deployment.getMetadata().getName();
                String deletedWrapDeployment =
                    InformerUtils.wrap(v1Deployment.getMetadata().getNamespace(), deploymentName);
                List<InformerDeleteEvent> deleteEvents = deleteEventContainers.entrySet().stream()
                    .filter(entry -> InformerEventType.DEPLOYMENT.equals(entry.getValue().getType()))
                    .filter(k -> k.getKey().equals(deletedWrapDeployment)).map(Map.Entry::getValue)
                    .collect(Collectors.toList());
                deleteEvents.forEach(ide -> {
                    log.info("{} deployment deleted in namespace {}!", deploymentName,
                        v1Deployment.getMetadata().getNamespace());
                    ide.setStatus(InformerDeleteEvent.DeleteStatus.DELETE_SUCCESS);
                    ide.setResult(v1Deployment);
                    ide.release();
                });
            }

        };
    }

    @Override
    public CompletableFuture<Void> getOrAddConfigMapInformer() {
        // Ensure only one configmap informer is created and initialized
        return informerContainers.computeIfAbsent(InformerEventType.CONFIG_MAP.name(), (cmName) -> {
            // Create the configmap informer
            final SharedIndexInformer<V1ConfigMap> cmInformer =
                sharedInformerFactory.sharedIndexInformerFor((CallGeneratorParams params) -> {
                    return coreV1Api.listConfigMapForAllNamespacesCall(null, null, null, null, null, null,
                        params.resourceVersion, params.timeoutSeconds, params.watch, null);
                }, V1ConfigMap.class, V1ConfigMapList.class);

            cmInformer.addEventHandler(createConfigMapHandler());

            // Start the configmap informer
            sharedInformerFactory.startAllRegisteredInformers();

            CompletableFuture<Void> future = new CompletableFuture<>();

            // Ensure the configmap informer is synced
            syncInformer(cmInformer, V1ConfigMap.class, new InformerInitializeCallback() {
                @Override
                public void initializeComplete() {
                    future.complete(null);
                }

                @Override
                public void initializeFailed(InformerException e) {
                    future.completeExceptionally(e);
                }
            });
            return future;
        });
    }

    @SuppressWarnings("unchecked")
    private ResourceEventHandler<V1ConfigMap> createConfigMapHandler() {
        return new ResourceEventHandler<V1ConfigMap>() {
            @Override
            public void onAdd(V1ConfigMap v1ConfigMap) {
                String configMapName = v1ConfigMap.getMetadata().getName();
                String addedWrapConfigMap =
                    InformerUtils.wrap(v1ConfigMap.getMetadata().getNamespace(), configMapName);
                List<InformerCreateEvent> createEvents = createEventContainers.entrySet().stream()
                    .filter(entry -> InformerEventType.CONFIG_MAP.equals(entry.getValue().getType()))
                    .filter(k -> k.getKey().equals(addedWrapConfigMap)).map(Map.Entry::getValue)
                    .collect(Collectors.toList());
                createEvents.forEach(ice -> {
                    log.info("{} configmap added in namespace {}!", configMapName,
                        v1ConfigMap.getMetadata().getNamespace());
                    ice.setStatus(InformerCreateEvent.CreateStatus.CREATE_SUCCESS);
                    ice.setResult(v1ConfigMap);
                    ice.release();
                });
            }

            @Override
            public void onUpdate(V1ConfigMap oldConfigMap, V1ConfigMap newConfigMap) {
                log.debug("{} => {} configmap updated in namespace {}!",
                    oldConfigMap.getMetadata().getName(), newConfigMap.getMetadata().getName(),
                    newConfigMap.getMetadata().getNamespace());
            }

            @Override
            public void onDelete(V1ConfigMap v1ConfigMap, boolean deletedFinalStateUnknown) {
                String configMapName = v1ConfigMap.getMetadata().getName();
                String deletedWrapConfigMap =
                    InformerUtils.wrap(v1ConfigMap.getMetadata().getNamespace(), configMapName);
                List<InformerDeleteEvent> deleteEvents = deleteEventContainers.entrySet().stream()
                    .filter(entry -> InformerEventType.CONFIG_MAP.equals(entry.getValue().getType()))
                    .filter(k -> k.getKey().equals(deletedWrapConfigMap)).map(Map.Entry::getValue)
                    .collect(Collectors.toList());
                deleteEvents.forEach(ide -> {
                    log.info("{} configmap deleted in namespace {}!", configMapName,
                        v1ConfigMap.getMetadata().getNamespace());
                    ide.setStatus(InformerDeleteEvent.DeleteStatus.DELETE_SUCCESS);
                    ide.setResult(v1ConfigMap);
                    ide.release();
                });
            }

        };
    }

    @Override
    public void shutdown() {
        sharedInformerFactory.stopAllRegisteredInformers();
        createEventContainers.clear();
        deleteEventContainers.clear();
    }

    private <ApiType extends KubernetesObject> void syncInformer(
        final SharedIndexInformer<ApiType> informer, Class<ApiType> apiTypeClass,
        final InformerInitializeCallback callback) {
        String apiTypeName = apiTypeClass.getSimpleName();
        Actions.Action waitForSyncedAction = Actions.Action.builder().actionName("waitForSyncedAction")
            .numRetries(KubernetesConfiguration.informerSyncRetryNumber)
            .sleepBetweenInvocationsMs(KubernetesConfiguration.informerSyncRetryIntervalMs)
            .supplier(() -> {
                boolean synced = informer.hasSynced();
                if (!synced) {
                    return Actions.ActionResult.builder().success(false)
                        .errorMsg(String.format("[%s] informers have not yet been synced", apiTypeName))
                        .build();
                }
                return Actions.ActionResult.builder().success(true).build();
            }).build();

        AtomicBoolean success = new AtomicBoolean(false);
        Actions.newBuilder().addAction(
            waitForSyncedAction.toBuilder().onSuccess((ignored) -> success.set(true)).build()).run();

        if (!success.get()) {
            callback.initializeFailed(new InformerException(
                String.format("Failed to wait for [%s] informers synced", apiTypeName)));
        } else {
            log.info("[{}] informers synced successfully", apiTypeName);
            callback.initializeComplete();
        }
    }

    public boolean checkStatefulSetIsReady(String namespace, String statefulSetName) {
        SharedIndexInformer<V1StatefulSet> informer =
            sharedInformerFactory.getExistingSharedIndexInformer(V1StatefulSet.class);
        Lister<V1StatefulSet> lister = new Lister<>(informer.getIndexer());
        // need do this due "lister.get()" does not work
        Map<String, V1StatefulSet> v1StatefulSetMap = lister.list().stream()
            // wrap the namespace name to ensure unique key
            .collect(Collectors.toMap(
                ss -> InformerUtils.wrap(ss.getMetadata().getNamespace(), ss.getMetadata().getName()),
                ss -> ss));
        String wrapStatefulSetName = InformerUtils.wrap(namespace, statefulSetName);
        V1StatefulSet v1StatefulSet = v1StatefulSetMap.get(wrapStatefulSetName);
        // maybe interrupted by mistaken namespace delete
        if (v1StatefulSet == null) {
            throw new RuntimeException(
                String.format("Statefulset %s in namespace %s has not been found in informer lister",
                    statefulSetName, namespace));
        }
        log.info(
            "Current status of statefulset [{}] in namespace [{}] is spec replicas: {}, status replicas: {}, ready replicas: {}",
            statefulSetName, namespace, v1StatefulSet.getSpec().getReplicas(),
            v1StatefulSet.getStatus().getReplicas(), v1StatefulSet.getStatus().getReadyReplicas());

        return v1StatefulSet.getSpec().getReplicas().equals(v1StatefulSet.getStatus().getReplicas())
            && v1StatefulSet.getStatus().getReplicas()
            .equals(v1StatefulSet.getStatus().getReadyReplicas())
            && v1StatefulSet.getStatus().getCurrentRevision()
            .equals(v1StatefulSet.getStatus().getUpdateRevision());
    }

    public boolean checkDeployMentIsReady(String namespace, String deployMentName) {
        SharedIndexInformer<V1Deployment> informer =
            sharedInformerFactory.getExistingSharedIndexInformer(V1Deployment.class);
        Lister<V1Deployment> lister = new Lister<>(informer.getIndexer());
        // need do this due "lister.get()" does not work
        Map<String, V1Deployment> v1DeploymenttMap = lister.list().stream()
            // wrap the namespace name to ensure unique key
            .collect(Collectors.toMap(
                ss -> InformerUtils.wrap(ss.getMetadata().getNamespace(), ss.getMetadata().getName()),
                ss -> ss));
        String wrapDeployMentName = InformerUtils.wrap(namespace, deployMentName);
        V1Deployment v1DeployMent = v1DeploymenttMap.get(wrapDeployMentName);
        // maybe interrupted by mistaken namespace delete
        if (v1DeployMent == null) {
            throw new RuntimeException(
                String.format("DeployMent %s in namespace %s has not been found in informer lister",
                    deployMentName, namespace));
        }
        log.info(
            "Current status of deployment [{}] in namespace [{}] is spec replicas: {}, status replicas: {}, ready replicas: {}",
            deployMentName, namespace, v1DeployMent.getSpec().getReplicas(),
            v1DeployMent.getStatus().getReplicas(), v1DeployMent.getStatus().getReadyReplicas());

        return v1DeployMent.getSpec().getReplicas().equals(v1DeployMent.getStatus().getReplicas())
            && v1DeployMent.getStatus().getReplicas()
            .equals(v1DeployMent.getStatus().getReadyReplicas());
    }

    private CompletableFuture<Void> checkStatefulSetStatus(String namespace, String statefulSetName) {
        Actions.Action checkStatusAction =
            Actions.Action.builder().actionName("checkStatefulSetStatusAction")
                .numRetries(KubernetesConfiguration.informerCheckRetryNumber)
                .sleepBetweenInvocationsMs(KubernetesConfiguration.informerCheckRetryIntervalMs)
                .supplier(() -> {
                    boolean isReady = checkStatefulSetIsReady(namespace, statefulSetName);
                    if (!isReady) {
                        return Actions.ActionResult.builder().success(false)
                            .errorMsg(
                                String.format("Statefulset [%s] in namespace [%s] has not yet been ready",
                                    statefulSetName, namespace))
                            .build();
                    }
                    return Actions.ActionResult.builder().success(true).build();
                }).build();

        AtomicBoolean success = new AtomicBoolean(false);
        Actions.newBuilder()
            .addAction(checkStatusAction.toBuilder().onSuccess((ignored) -> success.set(true)).build())
            .run();

        CompletableFuture<Void> future = new CompletableFuture<>();

        if (!success.get()) {
            future.completeExceptionally(new InformerException(
                String.format("Failed to check status for statefulset [%s] in namespace [%s]",
                    statefulSetName, namespace)));
        } else {
            log.info("Statefulset [{}] in namespace [{}] is ready", statefulSetName, namespace);
            future.complete(null);
        }
        return future;
    }

    private CompletableFuture<Void> checkDeploymentStatus(String namespace, String deploymentName) {
        Actions.Action checkStatusAction =
            Actions.Action.builder().actionName("checkDeploymentStatusAction")
                .numRetries(KubernetesConfiguration.informerCheckRetryNumber)
                .sleepBetweenInvocationsMs(KubernetesConfiguration.informerCheckRetryIntervalMs)
                .supplier(() -> {
                    boolean isReady = checkDeployMentIsReady(namespace, deploymentName);
                    if (!isReady) {
                        return Actions.ActionResult.builder().success(false)
                            .errorMsg(
                                String.format("Deployment [%s] in namespace [%s] has not yet been ready",
                                    deploymentName, namespace))
                            .build();
                    }
                    return Actions.ActionResult.builder().success(true).build();
                }).build();

        AtomicBoolean success = new AtomicBoolean(false);
        Actions.newBuilder()
            .addAction(checkStatusAction.toBuilder().onSuccess((ignored) -> success.set(true)).build())
            .run();

        CompletableFuture<Void> future = new CompletableFuture<>();

        if (!success.get()) {
            future.completeExceptionally(new InformerException(
                String.format("Failed to check status for deployment [%s] in namespace [%s]",
                    deploymentName, namespace)));
        } else {
            log.info("Deployment [{}] in namespace [{}] is ready", deploymentName, namespace);
            future.complete(null);
        }
        return future;
    }

    interface InformerInitializeCallback {
        /**
         * Informer initialize completely
         */
        void initializeComplete();

        /**
         * Informer initialize Exceptionally
         *
         * @param e Informer Exception
         */
        void initializeFailed(InformerException e);
    }

}
